/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.dubbo.common.concurrent;

import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * <p>A list of listeners, each with an associated {@code Executor}, that
 * guarantees that every {@code Runnable} that is {@linkplain #add added} will
 * be executed after {@link #execute()} is called. Any {@code Runnable} added
 * after the call to {@code execute} is still guaranteed to execute. There is no
 * guarantee, however, that listeners will be executed in the order that they
 * are added.
 * <p>
 * <p>Exceptions thrown by a listener will be propagated up to the executor.
 * Any exception thrown during {@code Executor.execute} (e.g., a {@code
 * RejectedExecutionException} or an exception thrown by {@linkplain
 * MoreExecutors#sameThreadExecutor inline execution}) will be caught and
 * logged.
 */
public final class ExecutionList {
    // Logger to log exceptions caught when running runnables.
    static final Logger logger = LoggerFactory.getLogger(ExecutionList.class.getName());

    /**
     * The runnable, executor pairs to execute.  This acts as a stack threaded through the
     * {@link RunnableExecutorPair#next} field.
     */
    private RunnableExecutorPair runnables;

    private boolean executed;

    private static final Executor DEFAULT_EXECUTOR = new ThreadPoolExecutor(1, 10, 60000L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new NamedThreadFactory("DubboFutureCallbackDefault", true));

    /**
     * Creates a new, empty {@link ExecutionList}.
     */
    public ExecutionList() {
    }

    /**
     * Adds the {@code Runnable} and accompanying {@code Executor} to the list of
     * listeners to execute. If execution has already begun, the listener is
     * executed immediately.
     * <p>
     * <p>Note: For fast, lightweight listeners that would be safe to execute in
     * any thread, consider {@link MoreExecutors#sameThreadExecutor}. For heavier
     * listeners, {@code sameThreadExecutor()} carries some caveats: First, the
     * thread that the listener runs in depends on whether the {@code
     * ExecutionList} has been executed at the time it is added. In particular,
     * listeners may run in the thread that calls {@code add}. Second, the thread
     * that calls {@link #execute} may be an internal implementation thread, such
     * as an RPC network thread, and {@code sameThreadExecutor()} listeners may
     * run in this thread. Finally, during the execution of a {@code
     * sameThreadExecutor} listener, all other registered but unexecuted
     * listeners are prevented from running, even if those listeners are to run
     * in other executors.
     */
    public void add(Runnable runnable, Executor executor) {
        // Fail fast on a null.  We throw NPE here because the contract of
        // Executor states that it throws NPE on null listener, so we propagate
        // that contract up into the add method as well.
        if (runnable == null) {
            throw new NullPointerException("Runnable can not be null!");
        }
        if (executor == null) {
            logger.info("Executor for listenablefuture is null, will use default executor!");
            executor = DEFAULT_EXECUTOR;
        }
        // Lock while we check state.  We must maintain the lock while adding the
        // new pair so that another thread can't run the list out from under us.
        // We only add to the list if we have not yet started execution.
        synchronized (this) {
            if (!executed) {
                runnables = new RunnableExecutorPair(runnable, executor, runnables);
                return;
            }
        }
        // Execute the runnable immediately. Because of scheduling this may end up
        // getting called before some of the previously added runnables, but we're
        // OK with that.  If we want to change the contract to guarantee ordering
        // among runnables we'd have to modify the logic here to allow it.
        executeListener(runnable, executor);
    }

    /**
     * Runs this execution list, executing all existing pairs in the order they
     * were added. However, note that listeners added after this point may be
     * executed before those previously added, and note that the execution order
     * of all listeners is ultimately chosen by the implementations of the
     * supplied executors.
     * <p>
     * <p>This method is idempotent. Calling it several times in parallel is
     * semantically equivalent to calling it exactly once.
     *
     * @since 10.0 (present in 1.0 as {@code run})
     */
    public void execute() {
        // Lock while we update our state so the add method above will finish adding
        // any listeners before we start to run them.
        RunnableExecutorPair list;
        synchronized (this) {
            if (executed) {
                return;
            }
            executed = true;
            list = runnables;
            runnables = null;  // allow GC to free listeners even if this stays around for a while.
        }
        // If we succeeded then list holds all the runnables we to execute.  The pairs in the stack are
        // in the opposite order from how they were added so we need to reverse the list to fulfill our
        // contract.
        // This is somewhat annoying, but turns out to be very fast in practice.  Alternatively, we
        // could drop the contract on the method that enforces this queue like behavior since depending
        // on it is likely to be a bug anyway.

        // N.B. All writes to the list and the next pointers must have happened before the above
        // synchronized block, so we can iterate the list without the lock held here.
        RunnableExecutorPair reversedList = null;
        while (list != null) {
            RunnableExecutorPair tmp = list;
            list = list.next;
            tmp.next = reversedList;
            reversedList = tmp;
        }
        while (reversedList != null) {
            executeListener(reversedList.runnable, reversedList.executor);
            reversedList = reversedList.next;
        }
    }

    /**
     * Submits the given runnable to the given {@link Executor} catching and logging all
     * {@linkplain RuntimeException runtime exceptions} thrown by the executor.
     */
    private static void executeListener(Runnable runnable, Executor executor) {
        try {
            executor.execute(runnable);
        } catch (RuntimeException e) {
            // Log it and keep going, bad runnable and/or executor.  Don't
            // punish the other runnables if we're given a bad one.  We only
            // catch RuntimeException because we want Errors to propagate up.
            logger.error("RuntimeException while executing runnable "
                    + runnable + " with executor " + executor, e);
        }
    }

    private static final class RunnableExecutorPair {
        final Runnable runnable;
        final Executor executor;
        RunnableExecutorPair next;

        RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
            this.runnable = runnable;
            this.executor = executor;
            this.next = next;
        }
    }
}
